home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_bsddb.py < prev    next >
Text File  |  2005-10-18  |  9KB  |  292 lines

  1. #! /usr/bin/env python
  2. """Test script for the bsddb C module by Roger E. Masse
  3.    Adapted to unittest format and expanded scope by Raymond Hettinger
  4. """
  5. import os, sys
  6. import copy
  7. import bsddb
  8. import dbhash # Just so we know it's imported
  9. import unittest
  10. from test import test_support
  11. from sets import Set
  12.  
  13. class TestBSDDB(unittest.TestCase):
  14.  
  15.     def setUp(self):
  16.         self.f = self.openmethod[0](self.fname, 'c')
  17.         self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
  18.         for k, v in self.d.iteritems():
  19.             self.f[k] = v
  20.  
  21.     def tearDown(self):
  22.         self.f.sync()
  23.         self.f.close()
  24.         if self.fname is None:
  25.             return
  26.         try:
  27.             os.remove(self.fname)
  28.         except os.error:
  29.             pass
  30.  
  31.     def test_getitem(self):
  32.         for k, v in self.d.iteritems():
  33.             self.assertEqual(self.f[k], v)
  34.  
  35.     def test_len(self):
  36.         self.assertEqual(len(self.f), len(self.d))
  37.  
  38.     def test_change(self):
  39.         self.f['r'] = 'discovered'
  40.         self.assertEqual(self.f['r'], 'discovered')
  41.         self.assert_('r' in self.f.keys())
  42.         self.assert_('discovered' in self.f.values())
  43.  
  44.     def test_close_and_reopen(self):
  45.         if self.fname is None:
  46.             # if we're using an in-memory only db, we can't reopen it
  47.             # so finish here.
  48.             return
  49.         self.f.close()
  50.         self.f = self.openmethod[0](self.fname, 'w')
  51.         for k, v in self.d.iteritems():
  52.             self.assertEqual(self.f[k], v)
  53.  
  54.     def assertSetEquals(self, seqn1, seqn2):
  55.         self.assertEqual(Set(seqn1), Set(seqn2))
  56.  
  57.     def test_mapping_iteration_methods(self):
  58.         f = self.f
  59.         d = self.d
  60.         self.assertSetEquals(d, f)
  61.         self.assertSetEquals(d.keys(), f.keys())
  62.         self.assertSetEquals(d.values(), f.values())
  63.         self.assertSetEquals(d.items(), f.items())
  64.         self.assertSetEquals(d.iterkeys(), f.iterkeys())
  65.         self.assertSetEquals(d.itervalues(), f.itervalues())
  66.         self.assertSetEquals(d.iteritems(), f.iteritems())
  67.  
  68.     def test_iter_while_modifying_values(self):
  69.         if not hasattr(self.f, '__iter__'):
  70.             return
  71.  
  72.         di = iter(self.d)
  73.         while 1:
  74.             try:
  75.                 key = di.next()
  76.                 self.d[key] = 'modified '+key
  77.             except StopIteration:
  78.                 break
  79.  
  80.         # it should behave the same as a dict.  modifying values
  81.         # of existing keys should not break iteration.  (adding
  82.         # or removing keys should)
  83.         fi = iter(self.f)
  84.         while 1:
  85.             try:
  86.                 key = fi.next()
  87.                 self.f[key] = 'modified '+key
  88.             except StopIteration:
  89.                 break
  90.  
  91.         self.test_mapping_iteration_methods()
  92.  
  93.     def test_iteritems_while_modifying_values(self):
  94.         if not hasattr(self.f, 'iteritems'):
  95.             return
  96.  
  97.         di = self.d.iteritems()
  98.         while 1:
  99.             try:
  100.                 k, v = di.next()
  101.                 self.d[k] = 'modified '+v
  102.             except StopIteration:
  103.                 break
  104.  
  105.         # it should behave the same as a dict.  modifying values
  106.         # of existing keys should not break iteration.  (adding
  107.         # or removing keys should)
  108.         fi = self.f.iteritems()
  109.         while 1:
  110.             try:
  111.                 k, v = fi.next()
  112.                 self.f[k] = 'modified '+v
  113.             except StopIteration:
  114.                 break
  115.  
  116.         self.test_mapping_iteration_methods()
  117.  
  118.     def test_first_next_looping(self):
  119.         items = [self.f.first()]
  120.         for i in xrange(1, len(self.f)):
  121.             items.append(self.f.next())
  122.         self.assertSetEquals(items, self.d.items())
  123.  
  124.     def test_previous_last_looping(self):
  125.         items = [self.f.last()]
  126.         for i in xrange(1, len(self.f)):
  127.             items.append(self.f.previous())
  128.         self.assertSetEquals(items, self.d.items())
  129.  
  130.     def test_set_location(self):
  131.         self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
  132.  
  133.     def test_contains(self):
  134.         for k in self.d:
  135.             self.assert_(k in self.f)
  136.         self.assert_('not here' not in self.f)
  137.  
  138.     def test_has_key(self):
  139.         for k in self.d:
  140.             self.assert_(self.f.has_key(k))
  141.         self.assert_(not self.f.has_key('not here'))
  142.  
  143.     def test_clear(self):
  144.         self.f.clear()
  145.         self.assertEqual(len(self.f), 0)
  146.  
  147.     def test__no_deadlock_first(self, debug=0):
  148.         # do this so that testers can see what function we're in in
  149.         # verbose mode when we deadlock.
  150.         sys.stdout.flush()
  151.  
  152.         # in pybsddb's _DBWithCursor this causes an internal DBCursor
  153.         # object is created.  Other test_ methods in this class could
  154.         # inadvertently cause the deadlock but an explicit test is needed.
  155.         if debug: print "A"
  156.         k,v = self.f.first()
  157.         if debug: print "B", k
  158.         self.f[k] = "deadlock.  do not pass go.  do not collect $200."
  159.         if debug: print "C"
  160.         # if the bsddb implementation leaves the DBCursor open during
  161.         # the database write and locking+threading support is enabled
  162.         # the cursor's read lock will deadlock the write lock request..
  163.  
  164.         # test the iterator interface (if present)
  165.         if hasattr(self.f, 'iteritems'):
  166.             if debug: print "D"
  167.             i = self.f.iteritems()
  168.             k,v = i.next()
  169.             if debug: print "E"
  170.             self.f[k] = "please don't deadlock"
  171.             if debug: print "F"
  172.             while 1:
  173.                 try:
  174.                     k,v = i.next()
  175.                 except StopIteration:
  176.                     break
  177.             if debug: print "F2"
  178.  
  179.             i = iter(self.f)
  180.             if debug: print "G"
  181.             while i:
  182.                 try:
  183.                     if debug: print "H"
  184.                     k = i.next()
  185.                     if debug: print "I"
  186.                     self.f[k] = "deadlocks-r-us"
  187.                     if debug: print "J"
  188.                 except StopIteration:
  189.                     i = None
  190.             if debug: print "K"
  191.  
  192.         # test the legacy cursor interface mixed with writes
  193.         self.assert_(self.f.first()[0] in self.d)
  194.         k = self.f.next()[0]
  195.         self.assert_(k in self.d)
  196.         self.f[k] = "be gone with ye deadlocks"
  197.         self.assert_(self.f[k], "be gone with ye deadlocks")
  198.  
  199.     def test_for_cursor_memleak(self):
  200.         if not hasattr(self.f, 'iteritems'):
  201.             return
  202.  
  203.         # do the bsddb._DBWithCursor _iter_mixin internals leak cursors?
  204.         nc1 = len(self.f._cursor_refs)
  205.         # create iterator
  206.         i = self.f.iteritems()
  207.         nc2 = len(self.f._cursor_refs)
  208.         # use the iterator (should run to the first yeild, creating the cursor)
  209.         k, v = i.next()
  210.         nc3 = len(self.f._cursor_refs)
  211.         # destroy the iterator; this should cause the weakref callback
  212.         # to remove the cursor object from self.f._cursor_refs
  213.         del i
  214.         nc4 = len(self.f._cursor_refs)
  215.  
  216.         self.assertEqual(nc1, nc2)
  217.         self.assertEqual(nc1, nc4)
  218.         self.assert_(nc3 == nc1+1)
  219.  
  220.     def test_popitem(self):
  221.         k, v = self.f.popitem()
  222.         self.assert_(k in self.d)
  223.         self.assert_(v in self.d.values())
  224.         self.assert_(k not in self.f)
  225.         self.assertEqual(len(self.d)-1, len(self.f))
  226.  
  227.     def test_pop(self):
  228.         k = 'w'
  229.         v = self.f.pop(k)
  230.         self.assertEqual(v, self.d[k])
  231.         self.assert_(k not in self.f)
  232.         self.assert_(v not in self.f.values())
  233.         self.assertEqual(len(self.d)-1, len(self.f))
  234.  
  235.     def test_get(self):
  236.         self.assertEqual(self.f.get('NotHere'), None)
  237.         self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
  238.         self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
  239.  
  240.     def test_setdefault(self):
  241.         self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
  242.         self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
  243.  
  244.     def test_update(self):
  245.         new = dict(y='life', u='of', i='brian')
  246.         self.f.update(new)
  247.         self.d.update(new)
  248.         for k, v in self.d.iteritems():
  249.             self.assertEqual(self.f[k], v)
  250.  
  251.     def test_keyordering(self):
  252.         if self.openmethod[0] is not bsddb.btopen:
  253.             return
  254.         keys = self.d.keys()
  255.         keys.sort()
  256.         self.assertEqual(self.f.first()[0], keys[0])
  257.         self.assertEqual(self.f.next()[0], keys[1])
  258.         self.assertEqual(self.f.last()[0], keys[-1])
  259.         self.assertEqual(self.f.previous()[0], keys[-2])
  260.         self.assertEqual(list(self.f), keys)
  261.  
  262. class TestBTree(TestBSDDB):
  263.     fname = test_support.TESTFN
  264.     openmethod = [bsddb.btopen]
  265.  
  266. class TestBTree_InMemory(TestBSDDB):
  267.     fname = None
  268.     openmethod = [bsddb.btopen]
  269.  
  270. class TestHashTable(TestBSDDB):
  271.     fname = test_support.TESTFN
  272.     openmethod = [bsddb.hashopen]
  273.  
  274. class TestHashTable_InMemory(TestBSDDB):
  275.     fname = None
  276.     openmethod = [bsddb.hashopen]
  277.  
  278. ##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
  279. ##         #                                   appears broken... at least on
  280. ##         #                                   Solaris Intel - rmasse 1/97
  281.  
  282. def test_main(verbose=None):
  283.     test_support.run_unittest(
  284.         TestBTree,
  285.         TestHashTable,
  286.         TestBTree_InMemory,
  287.         TestHashTable_InMemory,
  288.     )
  289.  
  290. if __name__ == "__main__":
  291.     test_main(verbose=True)
  292.